package cn.iocoder.springboot.lab31.rocketmqdemo.consumer;


import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.remoting.RPCHook;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RocketMqPushConfig {

    private String nameServer = "rocketmq-pbrn97ajnqq7.tdmq-rocketmq.ap-gz.public.tencenttdmq.com:9876";
    private String namespace ="rocketmq-pbrn97ajnqq7|etc_stg2";
    private String accessKey= "eyJrZXlJZCI6InJvY2tldG1xLXBicm45N2FqbnFxNyIsImFsZyI6IkhTMjU2In0.eyJzdWIiOiJyb2NrZXRtcS1wYnJuOTdham5xcTdfcm9ja2V0LWZ1bGwtb3BlcmF0b3Itcm9sZSJ9.OIQQS0J8Hoa1VhIhGTYp-W008TYcjAvxfxRibjWoGFQ";

    private String secretKey ="rocket-full-operator-role";

    private String dataPushTopic = "test-topicpush";
    private String dataPushTopic2 = "test-topicpush2";

    private String dataPushGroup = "test-grouppush";
    private String dataPushGroup2 = "test-grouppush2";

    /**
     * 油站信息推送的-consumer
     * 发布者订阅关系一致性  同一个消费者不同消费不同的topic
     * */
    @Bean("dataPushConsumer")
    public DefaultMQPushConsumer dataPushConsumer(MessageListenerConcurrently dataPushListener) throws MQClientException {
        RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(namespace, dataPushGroup, rpcHook,
                new AllocateMessageQueueAveragely(),true,null);
        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 3. 订阅主题，可以根据需要添加更多的订阅逻辑，比如指定标签等，这里订阅全部标签（使用"*"）
        consumer.subscribe(dataPushTopic, "*");
        consumer.setPullBatchSize(2); //慎改 目前就保持1
        consumer.setConsumeMessageBatchMaxSize(2);//慎改 目前就保持1
        System.out.println("Suspend current queue time set to: " + consumer.getSuspendCurrentQueueTimeMillis() + " ms");

        // 4. 设置消息监听器，将传入的实现了MessageListenerConcurrently接口的监听器注册到消费者上
        consumer.registerMessageListener(dataPushListener);
        consumer.setMaxReconsumeTimes(20);
        consumer.setConsumerGroup(dataPushGroup);
        // 5. 启动消费者
        consumer.start();
        return consumer;
    }

    @Bean("dataPushConsumer2")
    public DefaultMQPushConsumer dataPushConsumer2(MessageListenerConcurrently dataPushListener2) throws MQClientException {
        RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(namespace, dataPushGroup2, rpcHook,new AllocateMessageQueueAveragely(),true,null);
        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 3. 订阅主题，可以根据需要添加更多的订阅逻辑，比如指定标签等，这里订阅全部标签（使用"*"）
        consumer.subscribe(dataPushTopic2, "*");
        consumer.setPullBatchSize(2); //慎改 目前就保持1
        consumer.setConsumeMessageBatchMaxSize(2);//慎改 目前就保持1
        System.out.println("Suspend current queue time set to: " + consumer.getSuspendCurrentQueueTimeMillis() + " ms");

        // 4. 设置消息监听器，将传入的实现了MessageListenerConcurrently接口的监听器注册到消费者上
        consumer.registerMessageListener(dataPushListener2);
        consumer.setMaxReconsumeTimes(20);
        consumer.setConsumerGroup(dataPushGroup2);
        // 5. 启动消费者
        consumer.start();
        return consumer;
    }
}
